Introduction to Machine Learning Pipeline Workshop

This notebook requires Spark 2.2+ to be pip installed your env.

This workshop takes you through building a simple decision tree model using Spark's ML pipeline interface.

The different algorithms options are contained in http://spark.apache.org/docs/latest/api/python/pyspark.ml.html The Spark ML documentation is at http://spark.apache.org/docs/latest/ml-pipeline.html

If you're looking for Spark books why not consider Learning Spark or High Performance Spark (co-authored by the author of this notebook) -- http://amzn.to/2etDd0L

Need more than the algorithms available in Spark? Or just have some time ot kill? Why not join us in https://github.com/sparklingpandas/sparklingml and add your own algorithm! :)


In [12]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession

In [14]:
# Configure your SparkContext. You can set the master, by default this will use a local master unless you
# add setMaster or change your enviorment variables.
conf = SparkConf().setAppName("intro-to-ml")
# We use get or create here so that if the cell is evaluated multiple times we don't get multiple SparkContexts.
sc = SparkContext.getOrCreate(conf)

In [15]:
sqlContext = SparkSession.builder.getOrCreate()

Now we start by downloading loading some data which is in csv format so its a good thing we got that csv package included already for us.

Note: the data is a modified version of https://archive.ics.uci.edu/ml/datasets/Adult


In [18]:
# If your running this on a cluster you will need to copy the data file into HDFS or whatever cluster file system
# you are using
df = sqlContext.read.format("csv").option("header", "true").load("resources/adult.data")

In [19]:
df.cache()


Out[19]:
DataFrame[age: string, workclass: string, fnlwgt: string, education: string, education-num: string, maritial-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: string, capital-loss: string, hours-per-week: string, native-country: string, category: string]

In [20]:
df.head()


Out[20]:
Row(age=u'39', workclass=u' State-gov', fnlwgt=u' 77516', education=u' Bachelors', education-num=u' 13', maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=u' 2174', capital-loss=u' 0', hours-per-week=u' 40', native-country=u' United-States', category=u' <=50K')

So as we can see Spark has simply loaded all of the values as strings since we haven't specified another schema. We can isntead ask it to infer the schema and also handle this extra space magic:


In [21]:
df = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("resources/adult.data")

In [22]:
df.head()


Out[22]:
Row(age=39, workclass=u' State-gov', fnlwgt=77516.0, education=u' Bachelors', education-num=13.0, maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=2174.0, capital-loss=0.0, hours-per-week=40.0, native-country=u' United-States', category=u' <=50K')

In [23]:
df.cache()


Out[23]:
DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education-num: double, maritial-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: double, capital-loss: double, hours-per-week: double, native-country: string, category: string]

Spark has two different machine learning libraries. We will use the new Dataframe based one which is defined in pyspark.ml. Let's import the basics as well as DecisionTreeClassifier :)


In [34]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.param import Param, Params
from pyspark.ml.feature import Bucketizer, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

The first step is prepairing the features, here we are just choosing existing numeric features:


In [35]:
assembler = VectorAssembler(inputCols=["age", "education-num"], outputCol="feautres")

Now the pipeline only works on doubles, so we need to take our category and turn it into a double. The StringIndexer will do this for us:


In [36]:
indexer = StringIndexer(inputCol="category").setOutputCol("category-index")

In [37]:
pipeline = Pipeline().setStages([assembler, indexer])

We then need to "fit" our pipeline. This allows the StringIndexer to determine what strings will be assigned what indexes in the eventual transformation:


In [38]:
model=pipeline.fit(df)

We then transform our data into the prepaired format for our machine learning model to work on:


In [39]:
prepared = model.transform(df)

In [40]:
prepared.head()


Out[40]:
Row(age=39, workclass=u' State-gov', fnlwgt=77516.0, education=u' Bachelors', education-num=13.0, maritial-status=u' Never-married', occupation=u' Adm-clerical', relationship=u' Not-in-family', race=u' White', sex=u' Male', capital-gain=2174.0, capital-loss=0.0, hours-per-week=40.0, native-country=u' United-States', category=u' <=50K', feautres=DenseVector([39.0, 13.0]), category-index=0.0)

In [41]:
dt = DecisionTreeClassifier(labelCol = "category-index", featuresCol="feautres")

And now we fit on the prepared data


In [42]:
dt_model = dt.fit(prepared)

In [43]:
dt_model


Out[43]:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4941aa8ed4a004643060) of depth 5 with 61 nodes

But manually chaining this steps together makes it difficult to experiment. Instead we can put it together in a pipeline:


In [44]:
pipeline_and_model = Pipeline().setStages([assembler, indexer, dt])
pipeline_model = pipeline_and_model.fit(df)

In [45]:
dt_model.transform(prepared).select("prediction", "category-index").take(20)


Out[45]:
[Row(prediction=1.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=1.0)]

In [46]:
pipeline_model.transform(df).select("prediction", "category-index").take(20)


Out[46]:
[Row(prediction=1.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=1.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=1.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=0.0, category-index=0.0),
 Row(prediction=1.0, category-index=1.0)]

While this shows us the results, it isn't super easy to read. Thankfully we can inverse these.


In [47]:
labels = list(pipeline_model.stages[1].labels)

In [48]:
from pyspark.ml.feature import IndexToString
inverter = IndexToString(inputCol="prediction", outputCol="prediction-label", labels=labels)

In [49]:
inverter.transform(pipeline_model.transform(df)).select("prediction-label", "category").take(20)


Out[49]:
[Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' >50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' >50K', category=u' >50K'),
 Row(prediction-label=u' >50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' >50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' <=50K', category=u' <=50K'),
 Row(prediction-label=u' >50K', category=u' >50K')]

In [50]:
pipeline_model.stages[2]


Out[50]:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4941aa8ed4a004643060) of depth 5 with 61 nodes

In [51]:
from pyspark.sql.functions import *
df.groupBy("age").agg(min("hours-per-week"), avg("hours-per-week"), max("capital-gain"))


Out[51]:
DataFrame[age: int, min(hours-per-week): double, avg(hours-per-week): double, max(capital-gain): double]

In [52]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("age").orderBy("capital-gain").rowsBetween(-100, 100)

In [53]:
df.select(df["age"], df['capital-gain'], avg("capital-gain").over(windowSpec)).orderBy(desc("capital-gain")).show()


+---+------------+----------------------------------------------------------------------------------------------------------------------------+
|age|capital-gain|avg(capital-gain) OVER (PARTITION BY age ORDER BY capital-gain ASC NULLS FIRST ROWS BETWEEN 100 PRECEDING AND 100 FOLLOWING)|
+---+------------+----------------------------------------------------------------------------------------------------------------------------+
| 47|     99999.0|                                                                                                            16675.3786407767|
| 44|     99999.0|                                                                                                           9770.615384615385|
| 44|     99999.0|                                                                                                            9865.47572815534|
| 44|     99999.0|                                                                                                           9962.196078431372|
| 44|     99999.0|                                                                                                          10060.831683168317|
| 28|     99999.0|                                                                                                           4736.762376237623|
| 28|     99999.0|                                                                                                           4690.323529411765|
| 22|     99999.0|                                                                                                           2644.735294117647|
| 65|     99999.0|                                                                                                           6030.961538461538|
| 22|     99999.0|                                                                                                           2670.920792079208|
| 65|     99999.0|                                                                                                           6089.514563106796|
| 53|     99999.0|                                                                                                           8464.336633663366|
| 53|     99999.0|                                                                                                            8381.35294117647|
| 31|     99999.0|                                                                                                           4793.683168316832|
| 65|     99999.0|                                                                                                            6149.21568627451|
| 53|     99999.0|                                                                                                           8220.173076923076|
| 34|     99999.0|                                                                                                           5285.564356435643|
| 65|     99999.0|                                                                                                          6210.0990099009905|
| 78|     99999.0|                                                                                                           5208.217391304348|
| 53|     99999.0|                                                                                                           8299.980582524271|
+---+------------+----------------------------------------------------------------------------------------------------------------------------+
only showing top 20 rows

Now it's over to you to take it to the next steps!

First:

  • Use Spark's built in CV model to see if DT is actually a good model type for this data

After you've figured out how to measure effectiveness, it's time to see what we can do to improve:

  • What happens if we add additional features? Can we improve the DT?
  • What about different model types?

In [ ]: